package com.it.cloud.stream;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.HashMap;
import java.util.Map;

/**
 * Stream简单事例
 */
public class BasicStream {

    public static void main(String[] args) throws InterruptedException {
        Map<String, Object> props = new HashMap<>();
        // 流处理应用程序的标识符。在Kafka集群中必须是唯一的
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // key序列化反序列化
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        // value序列化反序列化
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        StreamsConfig config = new StreamsConfig(props);

        KStreamBuilder builder = new KStreamBuilder();
        // 从topic my-input-topic中输入，经过mapValues转换后，输出到my-output-topic
        builder.stream("my-input-topic").mapValues(value -> value.toString()).to("my-output-topic");

        Thread.sleep(3000);
        KafkaStreams streams = new KafkaStreams(builder, config);
        streams.start();
    }
}
